-
Notifications
You must be signed in to change notification settings - Fork 530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First cut at Dispatcher #1303
First cut at Dispatcher #1303
Conversation
Regarding where this lives, I feel like if this is going to be the spiritual successor to |
Possibly, but I kind of see the modules in the following way:
So from that standpoint, it's okay to stay in std, since everyone who wants it would already have this dependency. |
This is awesome. |
I'll take a look tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I have no idea what's going on there so I can only look at it from a user's perspective 😅
|
||
import scala.concurrent.duration._ | ||
|
||
class DispatcherSpec extends BaseSpec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having some examples of this being used with a callback-driven API would be nice (not necessarily in the form of tests)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I think this needs some more documentation. Probably just scaladoc alone is sufficient.
@RaasAhsan The state machine is ready for a second review! |
7e8327c
to
a8b98bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple more super tiny comments :) I'm going to look at this more later, but 👍 for merging after these are addressed
F uncancelable { _ => | ||
for { | ||
// for catching race conditions where we finished before we were in the map | ||
completed <- F.ref(LongMap[Unit]()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only gripe with this is that it can turn into a very slight memory leak. Basically, completed
won't be garbage collected until the last fiber in a batch terminates. I may be splitting hairs over this though, so feel free to rightfully ignore this :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is okay. It's a relatively small amount of state, and the only way to avoid this is to do Ref[Ref[LongMap[Unit]]
, but the outer Ref
still wouldn't be collected.
cancelToken = () => unsafeToFuture(token) | ||
|
||
// double-check to resolve race condition here | ||
if (canceled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a benign race condition here, but I'm going to mention it anyways:
- Unsafe cancel sets
canceled
totrue
. registerCancel
sets the cancel token here.- Unsafe cancel reads the
cancelToken
and runs it. registerCancel
readscanceled
to be true and runs the token.
The cancel token is pretty much just a fiber.cancel
, so I don't think we're violating correctness or leaking references, but it may just be worth fixing :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry took me a little while to scrape together the brain cells to understand this. :-) So I actually thought about that case when I was writing this! I didn't bother trying to resolve it because we already know cancel
to be idempotent, so I think we can leave it.
This is released as |
The idea here is actually pretty obvious in retrospect: a non-blocking supervised fiber-based dispatcher for sequencing effects. State is being managed by a pair of
AtomicReference
s: one of which contains an optional callback (and also encodes a double-check state to resolve races), while the other contains aLongMap
with the registered effects. Thedispatcher
fiber loops continuously, checking the state for new entries. If none are available, it creates anasync
node and registers the callback in thelatch
reference, double-checking the state to ensure that nothing was written to thestate
reference in the interim.If any new work is encountered, the dispatcher creates a fiber for each registered effect (which must be of type
F[Unit]
) and places that fiber into aRef
which is folded into the outerResource
scope, ensuring that child fibers are appropriately cleaned up when the dispatcher is shut down (note: here's a good and unavoidable use of thestart
function). Cancelation is carefully checked here.Note that any use of the
Runner
post-shutdown will result in deadlocks and memory leaks. I don't check for this at present.Anyway, on the calling side, all effects of type
F[E]
(for someE
) are wrapped in the appropriate machinery to populate aPromise
upon completion. Additionally, a bit of extra machinery is constructed to ensure that the state is unregistered if the cancelation function is called, or if the effect has already been started, that cancelation is then passed along to the carrier fiber.We don't actually need to fully encode a queue or anything like it since we can just spawn as many fibers as we want. When I realized this, I was able to greatly simplify the state machine. In abstract, it's really just an async latch which can be fired from impure code, together with a list of
F[Unit]
actions with callbacks to tie up cancelation.I want to add a few more things to the respective
RunnerPlatform
s, but this is pretty much ready for review.Resolves #1299